/**
 * Copyright (c) 2015, 玛雅牛［李飞］ (myaniu@gmail.com).
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.jfinal.plugin.zbus;

import java.io.IOException;

import com.jfinal.kit.HashKit;

import io.zbus.kit.JsonKit;
import io.zbus.mq.Message;
import io.zbus.mq.Producer;

/**
 * 发送器抽象基类
 *
 * @ClassName: AbstractSender
 * @author 李飞
 * @date 2015年9月5日 下午5:27:58
 * @since V1.0.0
 */
public class Sender<T> {

    private static final String DEFAULT_ENCODING = "UTF-8";

    /**
     * 生产者
     */
    private Producer producer;

    /**
     * 主题名称
     */
    private final String topic;

    /**
     * 标签
     */
    private String tag;


    /**
     * 默认构造函数，使用系统默认主题
     */
    public Sender() {
        this(Constants.DEF_TOPIC);
    }

    /**
     * 带参数的构造函数
     * @param topic topic主题
     */
    public Sender(String topic) {
        this.topic = topic;
    }

    /**
     * @Title: ensureProducer
     * @Description: 确保生产者使用前被创建
     * @throws IOException
     * @throws InterruptedException
     * @since V1.0.0
     */
    private void ensureProducer(T obj) throws IOException, InterruptedException {
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    // 创建生产者
                    this.tag = HashKit.md5(obj.getClass().getName()).substring(8, 24);
                    producer = ZbusPlugin.createProducer(this, topic);
                }
            }
        }
    }

    void close() throws IOException {
        // 将producer重新设定为null，重新获取producer对象
        producer = null;
    }

    /**
     * 发布一个消息（同步方式）
     * @param obj 消息
     */
    public void publish(T obj) {
        if (null != obj) {
            try {
                ensureProducer(obj);
                producer.publish(encode(obj));
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }

    /**
     * @Title: encode
     * @Description: 默认编码，子类可重载
     * @param obj
     * @return
     * @since V1.0.0
     */
    protected Message encode(T obj) {
        Message msg = new Message();
        msg.setBody(JsonKit.toJSONBytesWithType(obj, DEFAULT_ENCODING));
        msg.setTopic(topic);
        msg.setTag(tag);
        return msg;
    }
}
